refactor: update AsyncQueueManager to use a single RabbitMQ priority queue and enhance connection handling; add default queue name and priority mapping#257
Conversation
…queue and enhance connection handling; add default queue name and priority mapping
📝 WalkthroughWalkthroughRefactored AsyncQueueManager to consolidate per-priority RabbitMQ queues into a single priority-enabled queue, replacing inefficient polling with push-based async iterator consumption, adding explicit type hints and improved logging throughout. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@smokeyScraper This should solve the multiple Queue issue and utilise resources more properly |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/core/orchestration/queue_manager.py (1)
145-165: Critical: Swallowed exceptions cause failed messages to be acknowledged.When a handler raises an exception,
_process_itemcatches it (line 164), logs it, and returns normally. Back in_worker, execution continues tomessage.ack()(line 134), so the failed message is acknowledged and lost instead of being nacked.The exception must be re-raised so the worker's exception handler can properly nack the message.
Proposed fix
async def _process_item(self, item: Dict[str, Any], worker_name: str) -> None: """Process a queue item by message type.""" - try: - message_data = item["data"] - message_type = message_data.get("type", "unknown") + message_data = item["data"] + message_type = message_data.get("type", "unknown") - handler = self.handlers.get(message_type) + handler = self.handlers.get(message_type) - if handler: - logger.debug( - f"Worker {worker_name} processing {item['id']} (type: {message_type})" - ) - if asyncio.iscoroutinefunction(handler): - await handler(message_data) - else: - handler(message_data) + if handler: + logger.debug( + f"Worker {worker_name} processing {item['id']} (type: {message_type})" + ) + if asyncio.iscoroutinefunction(handler): + await handler(message_data) else: - logger.warning(f"No handler found for message type: {message_type}") - - except Exception as e: - logger.error(f"Error processing item {item.get('id', 'unknown')}: {str(e)}") + handler(message_data) + else: + logger.warning(f"No handler found for message type: {message_type}")By removing the try/except wrapper, any handler exception will propagate to
_worker, which will then log it and callmessage.nack(requeue=False).
🤖 Fix all issues with AI agents
In `@backend/app/core/orchestration/queue_manager.py`:
- Around line 87-109: The enqueue method can be called when the manager is not
connected (self.channel is None), so add a guard at the start of enqueue (in
function enqueue) that checks connection state (e.g., self.channel is not None
and not closed or a self.started flag set by start/stop) and if not connected
raise a clear exception (ConnectionError or RuntimeError) and/or log an error
before returning; ensure this check is applied before calling
self.channel.default_exchange.publish and reference self.queue_name in the error
message for context.
🧹 Nitpick comments (2)
backend/app/core/orchestration/queue_manager.py (2)
57-59: Uselogging.exceptionto capture stack trace.When logging errors in exception handlers,
logging.exceptionautomatically includes the stack trace, which aids debugging.Proposed fix
except Exception as e: - logger.error(f"Failed to connect to RabbitMQ: {e}") + logger.exception(f"Failed to connect to RabbitMQ: {e}") raise
135-143: Uselogging.exceptionfor error handlers to capture stack traces.Per static analysis hints,
logging.exceptionautomatically includes the traceback, which is helpful for debugging processing and worker errors.Proposed fix
except asyncio.CancelledError: raise except Exception as e: - logger.error(f"Error processing message: {e}") + logger.exception(f"Error processing message: {e}") await message.nack(requeue=False) except asyncio.CancelledError: logger.info(f"Worker {worker_name} cancelled") except Exception as e: - logger.error(f"Worker {worker_name} error: {e}") + logger.exception(f"Worker {worker_name} error: {e}")
| async def enqueue( | ||
| self, | ||
| message: Dict[str, Any], | ||
| priority: QueuePriority = QueuePriority.MEDIUM, | ||
| delay: float = 0, | ||
| ) -> None: | ||
| """Add a message to the single priority queue.""" | ||
| if delay > 0: | ||
| await asyncio.sleep(delay) | ||
|
|
||
| queue_item = { | ||
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | ||
| "priority": priority, | ||
| "data": message | ||
| "priority": priority.value, | ||
| "data": message, | ||
| } | ||
| json_message = json.dumps(queue_item).encode() | ||
| json_body = json.dumps(queue_item).encode() | ||
| numeric_priority = PRIORITY_MAP[priority] | ||
|
|
||
| await self.channel.default_exchange.publish( | ||
| aio_pika.Message(body=json_message), | ||
| routing_key=self.queues[priority] | ||
| aio_pika.Message(body=json_body, priority=numeric_priority), | ||
| routing_key=self.queue_name, | ||
| ) | ||
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") |
There was a problem hiding this comment.
Guard against publishing when not connected.
If enqueue() is called before start() or after stop(), self.channel will be None, causing an AttributeError on self.channel.default_exchange.publish(). Add a connection check to provide a clear error.
Proposed fix
async def enqueue(
self,
message: Dict[str, Any],
priority: QueuePriority = QueuePriority.MEDIUM,
delay: float = 0,
) -> None:
"""Add a message to the single priority queue."""
+ if not self.channel:
+ raise RuntimeError("Cannot enqueue: not connected. Call start() first.")
+
if delay > 0:
await asyncio.sleep(delay)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| async def enqueue( | |
| self, | |
| message: Dict[str, Any], | |
| priority: QueuePriority = QueuePriority.MEDIUM, | |
| delay: float = 0, | |
| ) -> None: | |
| """Add a message to the single priority queue.""" | |
| if delay > 0: | |
| await asyncio.sleep(delay) | |
| queue_item = { | |
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | |
| "priority": priority, | |
| "data": message | |
| "priority": priority.value, | |
| "data": message, | |
| } | |
| json_message = json.dumps(queue_item).encode() | |
| json_body = json.dumps(queue_item).encode() | |
| numeric_priority = PRIORITY_MAP[priority] | |
| await self.channel.default_exchange.publish( | |
| aio_pika.Message(body=json_message), | |
| routing_key=self.queues[priority] | |
| aio_pika.Message(body=json_body, priority=numeric_priority), | |
| routing_key=self.queue_name, | |
| ) | |
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") | |
| async def enqueue( | |
| self, | |
| message: Dict[str, Any], | |
| priority: QueuePriority = QueuePriority.MEDIUM, | |
| delay: float = 0, | |
| ) -> None: | |
| """Add a message to the single priority queue.""" | |
| if not self.channel: | |
| raise RuntimeError("Cannot enqueue: not connected. Call start() first.") | |
| if delay > 0: | |
| await asyncio.sleep(delay) | |
| queue_item = { | |
| "id": message.get("id", f"msg_{datetime.now().timestamp()}"), | |
| "priority": priority.value, | |
| "data": message, | |
| } | |
| json_body = json.dumps(queue_item).encode() | |
| numeric_priority = PRIORITY_MAP[priority] | |
| await self.channel.default_exchange.publish( | |
| aio_pika.Message(body=json_body, priority=numeric_priority), | |
| routing_key=self.queue_name, | |
| ) | |
| logger.info(f"Enqueued message {queue_item['id']} with priority {priority}") |
🤖 Prompt for AI Agents
In `@backend/app/core/orchestration/queue_manager.py` around lines 87 - 109, The
enqueue method can be called when the manager is not connected (self.channel is
None), so add a guard at the start of enqueue (in function enqueue) that checks
connection state (e.g., self.channel is not None and not closed or a
self.started flag set by start/stop) and if not connected raise a clear
exception (ConnectionError or RuntimeError) and/or log an error before
returning; ensure this check is applied before calling
self.channel.default_exchange.publish and reference self.queue_name in the error
message for context.
Pull Request
Closes #251
📝 Description
Refactors
AsyncQueueManagerto fix inefficient queue polling and make it production-ready. Replaces three separate queues (high_task_queue,medium_task_queue,low_task_queue) with a single RabbitMQ priority queue and push-based consumption. Workers no longer poll withqueue.get()and a fixed sleep; priority is enforced by the broker viax-max-priorityand per-messagepriority, and workers usequeue.iterator()so the broker pushes messages instead of the app polling.🔧 Changes Made
task_queue) declared withx-max-priority: 10. All messages go to this queue with a numeric priority (HIGH=10, MEDIUM=5, LOW=1).queue.get(). Each worker usesasync with queue.iterator(): async for message in queue_iterso the broker pushes messages; no polling orasyncio.sleep(0.1).channel.set_qos(prefetch_count=1)so the broker doesn’t over-deliver; each consumer has at most one unacked message.enqueue()publishes to the single queue withaio_pika.Message(..., priority=numeric_priority)and the samerouting_key. Public API unchanged:enqueue(message, priority=QueuePriority.MEDIUM, delay=0).stop()clearsworker_tasksaftergatherand closes channel/connection. Workers break out of the iterator when cancelled or whenself.runningis False.📷 Screenshots or Visual Changes (if applicable)
N/A
🤝 Collaboration
N/A
✅ Checklist
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.